package com.dahuan.processfunction;

import com.dahuan.bean.SensorReading;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

public class Process_KeyedProcessFunction {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism( 1 );

        final String host = "localhost";
        final int port = 8888;
        DataStream<String> inputStream = env.socketTextStream( host, port );

        DataStream<SensorReading> dataStream = inputStream.map( data -> {
            String[] split = data.split( "," );
            return new SensorReading( split[0], new Long( split[1] ), new Double( split[2] ) );
        } );

        //测试KeyedProcessFunction 先分组然后自定义处理
        dataStream.keyBy( "id" )
                .process( new MyProcess() )
                .print();

        env.execute("Process_KeyedProcessFunction");

    }

    //实现自定义处理函数
    public static class MyProcess extends KeyedProcessFunction<Tuple,SensorReading,Integer>{

        ValueState<Long> tsTimerState;

        @Override
        public void open(Configuration parameters) throws Exception {

            tsTimerState = getRuntimeContext().getState( new ValueStateDescriptor<Long>("ts-time",Long.class) );
        }

        @Override
        public void processElement(SensorReading value, Context ctx, Collector<Integer> out) throws Exception {
            out.collect( value.getId().length() );


            //TODO Context
            ctx.timestamp(); //获取当前时间戳
            ctx.getCurrentKey(); //获取正在处理的元素的键

            ctx.timerService().currentProcessingTime(); //返回当前处理时间
            ctx.timerService().currentWatermark(); //返回当前事件时间水印
            //开始的时间是伦敦时间 1970年1月1日
            ctx.timerService().registerProcessingTimeTimer( 10000L ); //注册要在处理时间超过给定时间时触发的计N时器。

            tsTimerState.update( ctx.timerService().currentProcessingTime() + 1000L );
            //ctx.timerService().deleteProcessingTimeTimer(tsTimerState.value() );

            //ctx.output(  ); //侧输出流


            ctx.timerService().registerEventTimeTimer((value.getTimestamp() + 10) * 1000L); //注册一个在事件时间水印超过给定时间时将要触发的计时器。
            ctx.timerService().deleteProcessingTimeTimer( 10000L ); //删除具有给定触发时间的处理时间计时器。仅当预先注册了此类计时器并且尚未到期时，此方法才有效。
            ctx.timerService().deleteEventTimeTimer( 10000L );//删除具有给定触发时间的事件时间计时器。仅当预先注册了此类计时器并且尚未到期时，此方法才有效



        }


        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<Integer> out) throws Exception {

            System.out.println(timestamp + "定时器触发" );
            ctx.getCurrentKey(); //获取触发计时器的密钥。
            ctx.timeDomain(); //触发计时器的时间

        }

        @Override
        public void close() throws Exception {
            tsTimerState.clear();
        }
    }
}
